【小ネタ】AWS IoTのアクション設定でKinesis Data Streamsにデータを送る時、パーティションキーに指定するnewuuid()について確認してみました
1 はじめに
CX事業本部の平内(SIN)です。
AWS IoTでアクション設定を使用してKinesis Data Streamsにデータを送る場合、どのシャードに送るかを指定するためにパーティションキーを設定する箇所があります。
スケール目的でシャードの数を増やしても、パーティションキーを適切に設定しないと、期待通りの分散が出来ません。今回は、ここで一般に使用される newuuid() について、動作を確認してみました。
すいません、特にひねった内容ではありません。個人的に理解を深めるために、単純にドキュメントに記載されている内容をなぞっただけです。
2 検証環境
(1) Kinesis Data Streams
動作確認用のストリームは、シャード数を10に設定して作成しました。
(2) AWS IoT
AWS IoTでは、アクション設定で特定のトピックに来たデータを、先のストリームに送りますが、とりあえず、パーティションキーには、${newuuid()} を設定しています。
3 検証コード
newuuid()は、AWS IoTで利用可能な関数で、16バイトのランダムなUUIDを返すものです。
例: newuuid() = 123a4567-b89c-12d3-e456-789012345000
https://docs.aws.amazon.com/ja_jp/iot/latest/developerguide/iot-sql-functions.html
一方、パーティションキーは、データをシャード別にグループ化するためのキーです。
Amazon Kinesis Data Streams の用語と概念
実際に、ランダムなUUIDをパーティションキーとして指定した場合に、どのように分散されているのかを下記のテスト用コードで確認してみました。
(1) mqttへの送信
時系列的に順番に、1から増加する数値を送っています。
const AWS = require("aws-sdk"); const endPoint = "xxxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"; const topicName = "topic_1"; var iotdata = new AWS.IotData({ endpoint: endPoint }); const max = 10; async function main(){ for (var i=0; i < max; i++) { var params = { topic: topicName, payload: JSON.stringify({value: i}) }; await iotdata.publish(params).promise(); } } main();
(2) Kinesis Data Streamsのデータ列挙
Kinesis Data Streamsのデータ列挙のテスト用のコードです。 ストーリーム名を指定して、シャードを列挙し、それぞれのシャードの先頭から最後まで、データを取得しています。
また、最後には、各シャードのデータ数を一覧しています。
const AWS = require("aws-sdk"); const kinesis = new AWS.Kinesis(); const streamName = "testStream"; async function main(){ // シャードIDの取得 const stream = await kinesis.describeStream({StreamName:streamName}).promise(); const results = stream.StreamDescription.Shards; const shardIds = results.map( r => r.ShardId ); let logs = []; let buffer = []; // シェードごとのデータ数をカウントする const partition = Array(shardIds.length).fill(0); // 各シャードのデータ取得 for(var i=0;i<shardIds.length;i++){ const shardId = shardIds[i]; console.log(`[${shardId}]`); // イテレータの取得 const params = { ShardId: shardId, ShardIteratorType: 'TRIM_HORIZON', // データの先頭から StreamName: streamName, }; const result = await kinesis.getShardIterator(params).promise(); let shardIterator = result.ShardIterator; while(true) { const data = await kinesis.getRecords({ ShardIterator: shardIterator, Limit: 1000 }).promise(); if (data.Records.length==0) { break; } data.Records.forEach( r => { console.log(r.ApproximateArrivalTimestamp.getTime() + ',' + r.PartitionKey + ', ' + r.SequenceNumber + ', ' + r.Data); buffer.push(r); partition[i]++; } ); shardIterator = data.NextShardIterator; } } console.log(`-------------------------------------`); partition.forEach( (n,i) => { console.log(`[${i}] : ${n}`) }); console.log(`-------------------------------------`); buffer.sort( (a,b) => { return a.ApproximateArrivalTimestamp.getTime() - b.ApproximateArrivalTimestamp.getTime(); // return a.SequenceNumber - b.SequenceNumber; }); buffer.forEach( b => { console.log(b.ApproximateArrivalTimestamp.getTime() + ',' + b.PartitionKey + ', ' + b.SequenceNumber + ', ' + b.Data); }) } main();
4 10件の送信
先のコードで、10件のデータを送信し、Kinesis Data Streamsに溜まったデータを列挙したところ、以下のようになりました。
[shardId-000000000000] 1580345019079,d45977fb-18b9-43c3-b8fe-8f229f8953f2, 49603743243229486200936459371677281878417297127533182978, {"value":4} [shardId-000000000001] 1580345018920,57c97b06-adfb-491a-aa7d-08b79cdc140e, 49603743243251786946134989994817608670870330859864457234, {"value":3} 1580345019304,fd7adf21-b38d-4888-b15d-f2fe2fc2d47e, 49603743243251786946134989994820026522509560186933346322, {"value":6} [shardId-000000000002] 1580345018602,dbbba199-cb9e-4e31-8a4f-897924574a4f, 49603743243274087691333520617867266026852267404092768290, {"value":0} [shardId-000000000003] [shardId-000000000004] [shardId-000000000005] [shardId-000000000006] 1580345018836,bd1bbe91-34e6-4c23-b75b-8073f06ea364, 49603743243363290672127643110435826751582090108466102370, {"value":2} 1580345019568,e277abf6-f494-46fe-aeb9-d448e623a04a, 49603743243363290672127643110439453529040934064709697634, {"value":8} 1580345019688,613834cc-effb-436b-917e-26dd1a29268c, 49603743243363290672127643110440662454860548693884403810, {"value":9} [shardId-000000000007] 1580345018686,90c67824-764b-4ac5-86c8-91021abd96d9, 49603743243385591417326173733576153544035123840797376626, {"value":1} 1580345019195,96bdbaaa-6927-4a09-bca4-77aa4f363934, 49603743243385591417326173733578571395674353099146788978, {"value":5} [shardId-000000000008] [shardId-000000000009] 1580345019453,b4b4d4d3-bda0-4157-b38f-686e59192074, 49603743243430192907723234979862851758039264520052932754, {"value":7} ------------------------------------- [0] : 1 [1] : 2 [2] : 1 [3] : 0 [4] : 0 [5] : 0 [6] : 3 [7] : 2 [8] : 0 [9] : 1
送信した10件がすべて入っていますが、1件ずつに分散されている訳ではありません。また、その順番もバラバラです。
これは、ユニークに生成したUUIDが、パーティションキーとして評価される時に、こんな感じで分散されることを表していると思います。
5 パーティションキー
指定されたパーティションキーは、MD5ハッシュ関数で128ビットの整数値にマッピングして、格納先のシャードが決定されるとなっています。
Amazon Kinesis Data Streams の用語と概念
下記のコードで、パーティションキー(UUID)がシャードのグルーピングに使用されるようすを確認してみました。
const uuidList = [] は、先の出力の各データのUUIDをコピーしたものです。128bitのHash値の最大値である FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF をシャードの数(今回は、10個)で割り、グルーピングされる数値の範囲を算出して、それぞれのUUIDのハッシュ値が、どのグループに該当するかを確認しています。
var md5 = require('md5'); const uuidList = [ "d45977fb-18b9-43c3-b8fe-8f229f8953f2", "57c97b06-adfb-491a-aa7d-08b79cdc140e", "fd7adf21-b38d-4888-b15d-f2fe2fc2d47e", "dbbba199-cb9e-4e31-8a4f-897924574a4f", "bd1bbe91-34e6-4c23-b75b-8073f06ea364", "e277abf6-f494-46fe-aeb9-d448e623a04a", "613834cc-effb-436b-917e-26dd1a29268c", "90c67824-764b-4ac5-86c8-91021abd96d9", "96bdbaaa-6927-4a09-bca4-77aa4f363934", "b4b4d4d3-bda0-4157-b38f-686e59192074" ]; // 128bitのHash値の最大値 const max= parseInt("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", 16); // シャード数 const shard = 10; // 境界値 const border = max/shard; // パーティションへの配置のカウント const partition = Array(shard).fill(0); uuidList.forEach(uuid => { // UUIDのハッシュ値を数値で取得 const hash = parseInt(md5(uuid), 16); // 境界値から何番目に配置されるかを計算する const index = parseInt(hash/(border)); // カウントする partition[index]++; }) partition.forEach( (n,i) => { console.log(`[${i}] : ${n}`) });
結果は、下記のとおりで、最初の結果と一致している事が確認できます。
[0] : 1 [1] : 2 [2] : 1 [3] : 0 [4] : 0 [5] : 0 [6] : 3 [7] : 2 [8] : 0 [9] : 1
6 固定のパーティションキー
ちなみに、パーティションキーを固定値にしてしまうと、検証結果は、以下のとおりとなり、1つのシャードに全部入ってしまうことが確認できます。
[shardId-000000000000] [shardId-000000000001] [shardId-000000000002] [shardId-000000000003] [shardId-000000000004] [shardId-000000000005] [shardId-000000000006] [shardId-000000000007] 1580344794097,1, 49603743154182610623203681167485264263685091409505812594, {"value":0} 1580344794232,1, 49603743154182610623203681167487682115324320667855224946, {"value":1} 1580344794399,1, 49603743154182610623203681167488891041143935297029931122, {"value":2} 1580344794513,1, 49603743154182610623203681167490099966963549926204637298, {"value":3} 1580344794676,1, 49603743154182610623203681167491308892783164555379343474, {"value":4} 1580344794781,1, 49603743154182610623203681167492517818602779184554049650, {"value":5} 1580344794938,1, 49603743154182610623203681167494935670242008442903462002, {"value":6} 1580344795094,1, 49603743154182610623203681167496144596061623140797644914, {"value":7} 1580344795201,1, 49603743154182610623203681167497353521881237769972351090, {"value":8} 1580344795317,1, 49603743154182610623203681167498562447700852399147057266, {"value":9} [shardId-000000000008] [shardId-000000000009] ------------------------------------- [0] : 0 [1] : 0 [2] : 0 [3] : 0 [4] : 0 [5] : 0 [6] : 0 [7] : 10 [8] : 0 [9] : 0
7 ソート
時系列で生成したデータなので、各シャードのデータをまとめて、タイムスタンプでソートすると、順番に並んだデータが取得できます。
ソート
buffer.sort( (a,b) => { return a.ApproximateArrivalTimestamp.getTime() - b.ApproximateArrivalTimestamp.getTime(); }); buffer.forEach( b => { console.log(b.ApproximateArrivalTimestamp.getTime() + ',' + b.PartitionKey + ', ' + b.SequenceNumber + ', ' + b.Data); })
結果出力
------------------------------------- 1580345018602,dbbba199-cb9e-4e31-8a4f-897924574a4f, 49603743243274087691333520617867266026852267404092768290, {"value":0} 1580345018686,90c67824-764b-4ac5-86c8-91021abd96d9, 49603743243385591417326173733576153544035123840797376626, {"value":1} 1580345018836,bd1bbe91-34e6-4c23-b75b-8073f06ea364, 49603743243363290672127643110435826751582090108466102370, {"value":2} 1580345018920,57c97b06-adfb-491a-aa7d-08b79cdc140e, 49603743243251786946134989994817608670870330859864457234, {"value":3} 1580345019079,d45977fb-18b9-43c3-b8fe-8f229f8953f2, 49603743243229486200936459371677281878417297127533182978, {"value":4} 1580345019195,96bdbaaa-6927-4a09-bca4-77aa4f363934, 49603743243385591417326173733578571395674353099146788978, {"value":5} 1580345019304,fd7adf21-b38d-4888-b15d-f2fe2fc2d47e, 49603743243251786946134989994820026522509560186933346322, {"value":6} 1580345019453,b4b4d4d3-bda0-4157-b38f-686e59192074, 49603743243430192907723234979862851758039264520052932754, {"value":7} 1580345019568,e277abf6-f494-46fe-aeb9-d448e623a04a, 49603743243363290672127643110439453529040934064709697634, {"value":8} 1580345019688,613834cc-effb-436b-917e-26dd1a29268c, 49603743243363290672127643110440662454860548693884403810, {"value":9}
ちなみに、シーケンス番号でソートした場合は、時系列の並びになっていませんでした。
ソート
buffer.sort( (a,b) => { return a.SequenceNumber - b.SequenceNumber; }); buffer.forEach( b => { console.log(b.ApproximateArrivalTimestamp.getTime() + ',' + b.PartitionKey + ', ' + b.SequenceNumber + ', ' + b.Data); })
結果出力
------------------------------------- 1580345019079,d45977fb-18b9-43c3-b8fe-8f229f8953f2, 49603743243229486200936459371677281878417297127533182978, {"value":4} 1580345018920,57c97b06-adfb-491a-aa7d-08b79cdc140e, 49603743243251786946134989994817608670870330859864457234, {"value":3} 1580345019304,fd7adf21-b38d-4888-b15d-f2fe2fc2d47e, 49603743243251786946134989994820026522509560186933346322, {"value":6} 1580345018602,dbbba199-cb9e-4e31-8a4f-897924574a4f, 49603743243274087691333520617867266026852267404092768290, {"value":0} 1580345018836,bd1bbe91-34e6-4c23-b75b-8073f06ea364, 49603743243363290672127643110435826751582090108466102370, {"value":2} 1580345019568,e277abf6-f494-46fe-aeb9-d448e623a04a, 49603743243363290672127643110439453529040934064709697634, {"value":8} 1580345019688,613834cc-effb-436b-917e-26dd1a29268c, 49603743243363290672127643110440662454860548693884403810, {"value":9} 1580345018686,90c67824-764b-4ac5-86c8-91021abd96d9, 49603743243385591417326173733576153544035123840797376626, {"value":1} 1580345019195,96bdbaaa-6927-4a09-bca4-77aa4f363934, 49603743243385591417326173733578571395674353099146788978, {"value":5} 1580345019453,b4b4d4d3-bda0-4157-b38f-686e59192074, 49603743243430192907723234979862851758039264520052932754, {"value":7}
8 大量データ
パーティションキーを ${newuuid()} に戻して、10000件のデータを送ってみた際の、シャードへの分散状況です。
各シャードに1000件づつ分散されるのが理想値ではありますが・・・充分に分散されているとは言えそうです。
const arr = Array(10000).fill(0); await Promise.all(arr.map( async (_v,i) => { var params = { topic: topicName, payload: JSON.stringify({value: i}) }; await iotdata.publish(params).promise(); console.log(`send ${i}`) })) console.log("done");
------------------------------------- [0] : 1025 [1] : 990 [2] : 948 [3] : 1061 [4] : 984 [5] : 1012 [6] : 999 [7] : 972 [8] : 981 [9] : 1028 -------------------------------------
9 最後に
今回は、AWS IoTからKinesis Data Streamsにデータを送る場合の、パーティションキー( newuuid() )について、少し確認してみました。
実際に色々試しみて、ちょっと理解進んだような気がしました。